MedicalImaging笔记(X1) FoDicom传输(二)DICOMService

上一个笔记最后提到SendRequest, 以下是SendRequest的序列图, 不是十分准确, 但可以有个大概的浏览.

序列图当中, SendNextMessage和DoSendMessage是DicomService中两个重要的方法.

先看SendNextMessage方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private void SendNextMessage()
{
while (true)
{
DicomMessage msg;
lock (_lock)
{
if (_sending)
{
break;
}

if (_msgQueue.Count == 0)
{
if (_pending.Count == 0) OnSendQueueEmpty();//
break;
}

if (Association.MaxAsyncOpsInvoked > 0
&& _pending.Count(req => req.Type != DicomCommandField.CGetRequest)
>= Association.MaxAsyncOpsInvoked)
{
break;
}

_sending = true;

msg = _msgQueue.Dequeue();

if (msg is DicomRequest)
{
_pending.Add(msg as DicomRequest);
}
}

DoSendMessage(msg);

lock (_lock) _sending = false;
}
}

里面出现2个数据结构: Queue<DicomMessage> _msgQueue 与 List<DicomRequest> _pending;
当request得到response的时候, 该msg才会从_pending列表中删除,
若_msgQueue和_pending都为空时, 即传输队列为空时, 会调用OnSendQueueEmpty

OnSendQueueEmpty在DicomClient中override

1
2
3
4
5
6
7
8
9
10
11
12
13
/// <summary>
/// Action to perform when send queue is empty.
/// </summary>
protected override void OnSendQueueEmpty()
{
lock (this.locker)
{
if (LingerTask == null || LingerTask.IsCompleted)
{
LingerTask = LingerAsync();
}
}
}

调用了LingerAsync();

linger是逗留的意思, 例如逗留50ms之后释放链接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private async Task LingerAsync()
{
while (true)
{
var disconnected = await ListenForDisconnectAsync(client.Linger, false).ConfigureAwait(false);

if (disconnected)
{
SetComplete();
break;
}

if (IsSendQueueEmpty)
{
await DoSendAssociationReleaseRequestAsync().ConfigureAwait(false);
break;
}
}
}

即在Client调用Send之后, 若一段时间内没有新的Request加入, 并且旧的request已经得到响应, 那么Client会释放与服务端的连接.

再看DoSendMessage方法
DoSendMessage方法代码比较多, 主要有3步

  1. 构造Dimse (序列图中省略了)
  2. 构造PDataTFStream, 作为Dimse的成员; DicomService也是PDataTFStream构造方法的参数之一
  3. 调用PDataTFStream的FlushAsync方法

PDataTFStream顾名思义是用来构造读写P-Data-TF, 它的两个方法:

  1. CreatePDVAsync
  2. WritePDVAsync

WritePDVAsync中调用DicomService的SendPDUAsync来真正发送PDU, 写到网络流中
为何SendPDUAsync是在DicomService, 而不是在PDataTFStream中
从代码中看出, SendPDUAsync还需要对PDU进行一些逻辑处理: 拥塞控制
_pduQueue中有最大容量限制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected async Task SendPDUAsync(PDU pdu)
{
// _pduQueueWatcher = new ManualResetEventSlim(true);
// 第一次执行时不会阻塞, _pduQueueWatcher构造成Signaled状态
_pduQueueWatcher.Wait();//阻塞, 等SendNextPDUAysnc完成的信号

lock (_lock)
{
_pduQueue.Enqueue(pdu);
if (_pduQueue.Count >= MaximumPDUsInQueue)
_pduQueueWatcher.Reset();//即使前面Wait通过, 这里也阻塞
}

await SendNextPDUAsync().ConfigureAwait(false);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private async Task SendNextPDUAsync()
{
while (true)
{
if (!IsConnected) return;

PDU pdu;

lock (_lock)
{
if (_writing) return;

if (_pduQueue.Count == 0) return;

_writing = true;

pdu = _pduQueue.Dequeue();
if (_pduQueue.Count < MaximumPDUsInQueue) _pduQueueWatcher.Set();
}

if (Options.LogDataPDUs && pdu is PDataTF) Logger.Info("{logId} -> {pdu}", LogID, pdu);

var ms = new MemoryStream();
pdu.Write().WritePDU(ms);

var buffer = ms.ToArray();

try
{
await _network.AsStream().WriteAsync(buffer, 0, (int)ms.Length).ConfigureAwait(false);
} Catch(...)
lock (_lock) _writing = false;
}
}
1
2
pdu = _pduQueue.Dequeue();
if (_pduQueue.Count < MaximumPDUsInQueue) _pduQueueWatcher.Set();

ManualResetEventSlim简介

  • A slimmed down version of ManualResetEvent.
  • Better performance than ManualResetEvent when wait times are expected to be very short
  • Wait和Reset的区别
    _pduQueueWatcher = new ManualResetEventSlim(true);//Event is signaled
    • 第一次调用Wait, 通过;
    • 第一次调用Reset, 阻塞; 将Event从Signaled重置为NonSignaled

以上介绍的是SendRequest的大致过程, DicomService中另一大块是接收消息, 下面三个方法是重点
ListenAndProcessPDUAsync
ProcessPDataTFAsync
PerformDimse

具体见代码了